博文推荐|整合 Spring 与 Pulsar,在 Java 中构建微服务 |
您所在的位置:网站首页 › Pulsar生产key value › 博文推荐|整合 Spring 与 Pulsar,在 Java 中构建微服务 |
本文翻译自 StreamNative 博客《Spring into Pulsar》,作者 Tim Spann,StreamNative 布道师。 译者简介 姜吉宁,开源爱好者、终生学习者、健身爱好者。Github @jjnnzb[1]。 本文我们来探讨如何在 Java 框架——Spring 中整合 Apache Pulsar。文章阐述如何在 Java 中构建基于 Spring 的微服务。在正文内容开始前,我们先介绍 Spring。Spring 是 Java 生态中鼎鼎有名的技术框架,自诞生已有近 20 年历史。Spring 提供了极为方便的装配与控制机制,极大地降低了构建应用的难度。有了 Spring,开发者无需堆砌非业务相关的重复模板代码。基于 Spring,开发者可以如鱼得水般快速开发微服务应用,包括各类 REST API、Web 应用程序、控制台应用程序等。推荐大家深入研究 Spring。 如果你想基于 Spring 来开发自己的第一个应用,推荐打开官方提供的 Spring Starter 起步链接[2]。借助这个链接,可以不费吹灰之力搭建好脚手架,并在此基础之上编码,实现自己的业务逻辑。 在本文示例中,将展示如何基于 Spring Boot 提供的依赖注入机制,为应用程序接入实例化和已配置的 Apache Pulsar 来生产与消费消息。此外,我还会通过使用 AMQP、Kafka 和 MQTT 发送和接收消息来展示 Apache Pulsar 与其他消息传递协议集成的灵活性。 最后,本文将浅析 Reactive Pulsar。强大的响应式框架 Reactive 是想构建 Spring 响应式应用的开发者们的不二之选。 基于 Spring 和 Pulsar 构建空气质量应用下图展示的是该应用的架构图。如图所示,Apache Pulsar 是该架构图的核心。Pulsar 在其中充当了路由、网关、消息总线和数据分发通道的角色。 ![]() 选择 Apache Pulsar 的核心原因之一是它具备极强的可扩展性,Pulsar 客户端和应用均支持无限扩展,解决了消息存储与分发在扩展性方面的难题。基于该特性,我们无需做额外的复制便能够复用数据。该特性对很多应用场景非常友好,包括基于 Spark 做的 ETL 任务和基于 Flink 做的实时持续 SQL 流分析等。Pulsar 还为 Spring 微服务无缝支持其他语言编写的服务,包括 Go、Python、C#、C++ 和 Node.JS 等。 点击下图查看示例应用演示视频。 有了 Spring Boot Starter 提供的脚手架,我们可以向 Maven build pom 文件中添加一些依赖,或选择用 Gradle。 首先,配置好 Pulsar 相关版本依赖。本文的示例选择 Pulsar 2.10.0[3] 版本及 JDK 11。开发者当下不应坚持 Java 8 版本,因为在不久的未来,JDK 17 将会是官方推荐的标准版本。 11 2.10.0接着,导入 Pulsar 客户端依赖。 org.apache.pulsar pulsar-client ${pulsar.version} org.apache.pulsar pulsar-client-admin ${pulsar.version} org.apache.pulsar pulsar-client-original ${pulsar.version} pom通过以下命令来编译打包: mvn package输入以下命令,运行应用程序: mvn spring-boot:run在配置文件中(application.resources)填充必要值相关配置,以连接到集群,读取应用数据。该文件一般放在 src/main/resources 目录下。 airnowapi.url=${AIRPORTNOWAPIURL} topic.name=persistent://public/default/airquality producer.name=airquality send.timeout=60 security.mode=off pulsar.service.url=pulsar://pulsar1:6650 #security.mode=on #pulsar.service.url=pulsar+ssl://demo.sndemo.snio.cloud:6651 pulsar.oauth2.audience=urn:sn:pulsar:sndemo:demo-cluster pulsar.oauth2.credentials-url=file:///cr/sndemo-tspann.json pulsar.oauth2.issuer-url=https://auth.streamnative.cloud/ server.port=8999 #kafka kafka.bootstrapAddress=pulsar1:9092 kafka.topic.name=airqualitykafka #mqtt mqtt.automaticReconnect=true mqtt.cleanSession=true mqtt.connectionTimeout=60 mqtt.clientId=airquality-MQTT mqtt.hostname=pulsar1 mqtt.port=1883 mqtt.topic=airqualitymqtt #amqp/rabbitmq amqp.server=pulsar1:5672 amqp.topic=amqp-airquality如上所示,配置项 security.mode 和 pulsar.service.url 被注释掉了。这么配置的原因是,我可以灵活地在 StreamNative 托管的云生产环境和本地的开发环境之间切换。同时,我们也可以采用自动化流程或使用环境变量来更好地满足生产环境的需求。airnowapi.url 这个变量配置的是用于访问 Air Now REST 数据流的专用令牌,建议配置到环境变量中。如果你也想使用该数据流,请先注册[4]。 我们现在开始构建应用。第一步,配置连接,连接上 Apache Pulsar 集群。 第二步,我们来新建一个 Spring 配置类,来初始化 Pulsar 客户端。在配置类中,通过 @Value 注解来注入 application.properties 中相关的配置项。 @Configuration public class PulsarConfig { @Value("${pulsar.service.url}") String pulsarUrl; @Value("${security.mode:off}") String securityMode; @Value("${pulsar.oauth2.audience}") String audience; @Value("${pulsar.oauth2.credentials-url}") String credentialsUrl; @Value("${pulsar.oauth2.issuer-url}") String issuerUrl; @Bean public org.apache.pulsar.client.api.PulsarClient pulsarClient() { PulsarClient client = null; if (securityMode.equalsIgnoreCase(OFF)) { try { client = PulsarClient.builder().serviceUrl(pulsarUrl).build(); } catch (PulsarClientException e) { e.printStackTrace(); client = null; } } else { try { try { client = PulsarClient.builder() .serviceUrl(pulsarUrl) .authentication( AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl), new URL(credentialsUrl),audience)).build(); } catch (MalformedURLException e) { e.printStackTrace(); } } catch (PulsarClientException e) { e.printStackTrace(); client = null; } } return client; } }第三步,在服务中配置生产者,代码如下: @Configuration public class PulsarProducerConfig { @Value("${producer.name:producername}") String producerName; @Value("${topic.name:airquality}") String topicName; @Autowired PulsarClient pulsarClient; @Bean public Producer getProducer() { ProducerBuilder producerBuilder = pulsarClient.newProducer(JSONSchema.of(Observation.class)) .topic(topicName) .producerName(producerName).sendTimeout(60, TimeUnit.SECONDS); Producer producer = null; try { producer = producerBuilder.create(); } catch (PulsarClientException e1) { e1.printStackTrace(); } return producer; } }在上述的配置类代码中,我们要构建一个 Pulsar 的生产者,该生产者会使用 Observation 类中的 JSON Schema。该 Observation 类中引入了 FasterXML Jackson 相关注解,但该类实际上就是一个 Java bean,其中记录的是 REST 数据流提供的测量日期、测量时间、状态码、经纬度等信息。 生产者我们添加上相关的业务逻辑代码,随即对接消息平台,测试消息发送流程。完整的源代码在此 Github 仓库。 @Service public class PulsarService { @Autowired PulsarClient pulsarClient; @Autowired ProducerObservation> producer; public MessageId sendObservation(Observation observation) { if (observation == null) { return null; } UUID uuidKey = UUID.randomUUID(); MessageId msgID = null; try { msgID = producer.newMessage() .key(uuidKey.toString()) .value(observation) .send(); } catch (PulsarClientException e) { e.printStackTrace(); } return msgID; } }![]() 消息发送完毕之后,我们可以通过 Spring 读取消息。在本节中,我们会构建一套消费程序测试消费数据。如果要填充一些业务逻辑、做消息路由、将消息转换到一至多个主题中,建议通过 Pulsar Function 来实现(可通过 Java、Python 或 Go 编写),而非 Spring Boot 微服务。我在这里提供了两种实现。Pulsar Spring Boot 消费者的源码在可从此 GitHub 仓库[5]中获取。 如果通过 Java Pulsar Function 来处理空气质量数据,可以参考此 GitHub 仓库[6]中的代码。如以下架构图所示,各 Function、微服务、Spark 和 Flink 任务均可作为整个架构中的组成部分,协调处理实时流数据。 ![]() 我们可以复用生产者中的配置类来连接集群。此外,我们还需要一套消费者的配置代码,该类需要在 application.properties 文件中配置消费者名称、订阅名称、主题名称并注入。在示例代码中,我们配置的订阅类型是 Shared(共享订阅),消费起始点是 Earliest。此外,我们还引入了在 Pulsar 生产者中使用的 Observation 来解析 JSON 数据。 @Configuration public class PulsarConsumerConfig { @Autowired PulsarClient pulsarClient; @Value("${consumer.name:consumerName}") String consumerName; @Value("${topic.name:airquality}") String topicName; @Value("${subscription.name:airqualitysubscription}") String subscriptionName; @Bean public Consumer getConsumer() { Consumer pulsarConsumer = null; ConsumerBuilder consumerBuilder = pulsarClient.newConsumer(JSONSchema.of(Observation.class)) .topic(topicName) .subscriptionName(subscriptionName) .subscriptionType(SubscriptionType.Shared) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .consumerName(consumerName); try { pulsarConsumer = consumerBuilder.subscribe(); } catch (PulsarClientException e) { e.printStackTrace(); } return pulsarConsumer; } }显然,运行消费者程序非常容易。在接收到消息事件之后,进行转换得到普通 Java 对象(Plain Old Java Object,即 POJO),我们可以对数据做任意处理,包括将 Spring 库持久化到数据库、发送到 REST 服务中或存储到文件等。 ![]() ![]() 本文中,我们探索了通过多种消息协议来与 Apache Pulsar 集群通信。由于文章篇幅有限,我们并没有测试全部 Apache Pulsar 支持的消息协议。事实上,我们还可以通过 RocketMQ、WebSocket 来与 Pulsar 集群交互,甚至还可以通过 JDBC 连接到 Pulsar SQL 层(Presto SQL)。 如果你对构建高速的响应式应用感兴趣,推荐试试 Reactive Pulsar 库。Reactive Pulsar 是一款快速高效的库,需要另外一篇单独的文章去介绍,可以点击此链接[7]了解更多信息。 本文列举了基于 Apache Pulsar 和 Spring 来构建应用的要点,包括关键步骤、实战细节等。Pulsar Java 客户端作为 Apache Pulsar 项目中的一等公民,文章借助实例展示了它的强大功能和灵活性。落地实战起来,打造自己的 Pulsar 应用吧! 相关资源 • 源码|空气质量示例代码[8] • 源码|Pulsar 提供的空气质量函数[9] • 源码|空气指令消费者[10] • 源码|FLiPN 空气质量校验[11] • 源码|FLiPN 空气质量 REST[12] • GitHub|Spring Boot Starter for Apache Pulsar[13] • Github|Reactive Pulsar Adapter[14] • 文档|Pulsar Java 客户端[15] 引用链接[1] jjnnzb: https://github.com/jjnnzb [2] 起步链接: https://start.spring.io/ [3] Pulsar 2.10.0: https://pulsar.apache.org/en/release-notes/ [4] 注册: https://docs.airnowapi.org/ [5] GitHub 仓库: https://github.com/tspannhw/airquality [6] GitHub 仓库: https://github.com/tspannhw/pulsar-airquality-function [7] 链接: https://github.com/lhotari/reactive-iot-backend-ApacheCon2021 [8] 源码|空气质量示例代码: https://github.com/tspannhw/airquality [9] 源码|Pulsar 提供的空气质量函数: https://github.com/tspannhw/pulsar-airquality-function [10] 源码|空气指令消费者: https://github.com/tspannhw/airquality-consumer [11] 源码|FLiPN 空气质量校验: https://github.com/tspannhw/FLiPN-AirQuality-Checks [12] 源码|FLiPN 空气质量 REST: https://github.com/tspannhw/FLiPN-AirQuality-REST [13] GitHub|Spring Boot Starter for Apache Pulsar: https://github.com/majusko/pulsar-java-spring-boot-starter [14] Github|Reactive Pulsar Adapter: https://github.com/datastax/reactive-pulsar [15] 文档|Pulsar Java 客户端: https://pulsar.apache.org/docs/en/client-libraries-java/ [16] 如何开发Pulsar Function?: https://pulsar.apache.org/docs/en/functions-develop/ |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |